# This code tracks the status of collections accounts in June 2009 that were
# opened in the past six months. Output data file is at account level.

#########################
###       Setup       ###
#########################
# Load packages

import pyspark.sql.functions as sqlf
import pandas as pd
import numpy  as np
import os
import sys
import shutil
from pyspark     import SparkContext, SparkConf
from pyspark.sql import SQLContext
from SparkDataTools2 import *

# Set spark working path
WORKDIR = '/geo_debt/geo/Spark'
CODEDIR = '/geo_debt/geo/Code/1_Spark/'
OUTDIR  = '/geo_debt/geo/Output/Table'


#cross walk
CW      = 'parquet.`/geo_debt/geo/cw/cw_zip_cz`'

#go to owrking directory
os.chdir(WORKDIR)



### some function for this specific exercise
def xxSelectItems(colltbID, headID, loopNum):
    result = ""
    i = 1
    while i<=loopNum:
        #99: dead; 0:dropped; 2: paidout but not dropped; 1: not paid out and not dropped.
        result = result + "CASE WHEN (" + headID + str(i) + ".subjectKey IS NULL) THEN 99 " +\
                              " WHEN (" + colltbID + str(i) + ".subjectKey IS NULL AND " + colltbID + str(i) + ".collectionKey IS NULL ) THEN 0 " +\
                              " WHEN (" + colltbID + str(i) + ".paidOutDate>=0) THEN 2 " +\
                              " ELSE 1 END AS coll_status" + str(i) +" "

        if i!=loopNum:
            result = result + ", "
        i = i+1
    return result

def xxJoinTable_subjectKey(join, tbName, tbID, loopNum):
    result = ""
    i = 1
    while i<=loopNum:
        result = result + " " + join + " " + tbName[i] + tbID + str(i) + " ON a.subjectKey=" + tbID + str(i) + ".subjectKey "
        i = i+1
    return result

def xxJoinTable_subANDcollKey(join, tbName, tbID, loopNum):
    result = ""
    i = 1
    while i<=loopNum:
        result = result + " " + join + " " + tbName[i] + tbID + str(i) + " ON a.subjectKey=" + tbID + str(i) + ".subjectKey AND " + "a.collectionKey=" + tbID + str(i) + ".collectionKey "
        i = i+1
    return result

#########################
###       Main        ###
#########################

startY = 2009
startM = 6

y = startY
m = startM
headersDataST = table_setup(sqlContext, y, m, 'headers')
headersDataST.createOrReplaceTempView("headersDataST")

collectionsDataST = table_setup(sqlContext, y, m, 'collections')
collectionsDataST.createOrReplaceTempView("collectionsDataST")

lastHY = y
lastHM = m-6
if lastHM <= 0:
    lastHY = lastHY - 1
    lastHM = lastHM + 12
halfYBefore = sql_date(lastHY,lastHM)


# collections accounts in June 2009 that were opened in the past six months
start_coll = "SELECT c.subjectKey, c.collectionKey " +\
             "FROM collectionsDataST c " +\
             "INNER JOIN headersDataST h ON c.subjectKey=h.subjectKey " +\
             "WHERE h.birthDate IS NOT NULL AND FLOOR((h.asOfDate - h.birthDate)/10000) BETWEEN 20 AND 80 " +\
             "AND c.highCreditAmount>100 AND (c.paidOutDate<=0 OR c.paidOutDate IS NULL) AND openDate > " + halfYBefore + " " +\
             "GROUP BY c.subjectKey, c.collectionKey "

coll_table_list = dict()
header_table_list = dict()
coll_filter = dict()
header_filter = dict()
for i in range(1,17):
    tmp_y = y
    tmp_m = m+6*i
    while tmp_m>12:
        tmp_m = tmp_m - 12
        tmp_y = tmp_y + 1
    while tmp_m<=0:
        tmp_m = tmp_m + 12
        tmp_y = tmp_y - 1
    ym = yyyymm(tmp_y,tmp_m)
    print("^^^ "+ym+" ^^^")
    coll_table_list[i] = table_setup(sqlContext, tmp_y, tmp_m, 'collections')
    coll_table_list[i].createOrReplaceTempView("coll_table_list"+str(i))

    coll_filter[i] = "(SELECT subjectKey, collectionKey, FIRST(paidOutDate) AS paidOutDate FROM coll_table_list"+str(i) +" GROUP BY subjectKey, collectionKey) "

    header_table_list[i] = table_setup(sqlContext, tmp_y, tmp_m, 'headers')
    header_table_list[i].createOrReplaceTempView("header_table_list"+str(i))

    header_filter[i] = "(SELECT subjectKey FROM header_table_list"+str(i)+") "

sql = "SELECT a.subjectKey, " + xxSelectItems("c","h",16) + " FROM (" +\
            start_coll + ") a " +\
            xxJoinTable_subANDcollKey("LEFT JOIN", coll_filter, "c", 16) +\
            xxJoinTable_subjectKey("LEFT JOIN", header_filter, "h", 16)

ym = yyyymm(y,m)
saveStata(sql,'coll_decay_'+ym, WORKDIR, OUTDIR)


sqlContext.clearCache()

print(' !!!! SUCCESS !!!!')
